# uncompyle6 version 3.2.3
# Python bytecode 3.6 (3379)
# Decompiled from: Python 3.6.8 |Anaconda custom (64-bit)| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
# Embedded file name: multiprocessing\resource_sharer.py
import os, signal, socket, sys, threading
from . import process
from .context import reduction
from . import util

__all__ = ["stop"]
if sys.platform == "win32":
    __all__ += ["DupSocket"]

    class DupSocket(object):
        """Picklable wrapper for a socket."""

        def __init__(self, sock):
            new_sock = sock.dup()

            def send(conn, pid):
                share = new_sock.share(pid)
                conn.send_bytes(share)

            self._id = _resource_sharer.register(send, new_sock.close)

        def detach(self):
            """Get the socket.  This should only be called once."""
            with _resource_sharer.get_connection(self._id) as (conn):
                share = conn.recv_bytes()
                return socket.fromshare(share)


else:
    __all__ += ["DupFd"]

    class DupFd(object):
        """Wrapper for fd which can be used at any time."""

        def __init__(self, fd):
            new_fd = os.dup(fd)

            def send(conn, pid):
                reduction.send_handle(conn, new_fd, pid)

            def close():
                os.close(new_fd)

            self._id = _resource_sharer.register(send, close)

        def detach(self):
            """Get the fd.  This should only be called once."""
            with _resource_sharer.get_connection(self._id) as (conn):
                return reduction.recv_handle(conn)


class _ResourceSharer(object):
    """Manager for resouces using background thread."""

    def __init__(self):
        self._key = 0
        self._cache = {}
        self._old_locks = []
        self._lock = threading.Lock()
        self._listener = None
        self._address = None
        self._thread = None
        util.register_after_fork(self, _ResourceSharer._afterfork)

    def register(self, send, close):
        """Register resource, returning an identifier."""
        with self._lock:
            if self._address is None:
                self._start()
            self._key += 1
            self._cache[self._key] = (send, close)
            return (self._address, self._key)

    @staticmethod
    def get_connection(ident):
        """Return connection from which to receive identified resource."""
        from .connection import Client

        address, key = ident
        c = Client(address, authkey=process.current_process().authkey)
        c.send((key, os.getpid()))
        return c

    def stop(self, timeout=None):
        """Stop the background thread and clear registered resources."""
        from .connection import Client

        with self._lock:
            if self._address is not None:
                c = Client(self._address, authkey=process.current_process().authkey)
                c.send(None)
                c.close()
                self._thread.join(timeout)
                if self._thread.is_alive():
                    util.sub_warning("_ResourceSharer thread did not stop when asked")
                self._listener.close()
                self._thread = None
                self._address = None
                self._listener = None
                for key, (send, close) in self._cache.items():
                    close()

                self._cache.clear()

    def _afterfork(self):
        for key, (send, close) in self._cache.items():
            close()

        self._cache.clear()
        self._old_locks.append(self._lock)
        self._lock = threading.Lock()
        if self._listener is not None:
            self._listener.close()
        self._listener = None
        self._address = None
        self._thread = None

    def _start(self):
        from .connection import Listener

        if not self._listener is None:
            raise AssertionError
        util.debug("starting listener and thread for sending handles")
        self._listener = Listener(authkey=process.current_process().authkey)
        self._address = self._listener.address
        t = threading.Thread(target=self._serve)
        t.daemon = True
        t.start()
        self._thread = t

    def _serve(self):
        if hasattr(signal, "pthread_sigmask"):
            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
        while True:
            try:
                with self._listener.accept() as (conn):
                    msg = conn.recv()
                    if msg is None:
                        break
                    key, destination_pid = msg
                    send, close = self._cache.pop(key)
                    try:
                        send(conn, destination_pid)
                    finally:
                        close()

            except:
                if not util.is_exiting():
                    sys.excepthook(*sys.exc_info())


_resource_sharer = _ResourceSharer()
stop = _resource_sharer.stop
